package gossip

import (
	"encoding/json"
	"goqk/core/events"
	"log"
	"os"
	"strings"
	"sync"

	"github.com/hashicorp/memberlist"
	"github.com/pborman/uuid"
)

// var (
// 	mtx        sync.RWMutex
// 	members    = flag.String("members", "", "comma seperated list of members")
// 	port       = flag.Int("port", 4001, "http port")
// 	items      = map[string]string{}
// 	broadcasts *memberlist.TransmitLimitedQueue
// )

type broadcast struct {
	msg    []byte
	notify chan<- struct{}
}

// type delegate struct {
// }

// Gossiper - 传播交易和区块
type Gossiper struct {
	mtx        sync.RWMutex
	members    string
	port       string
	Items      map[string][]byte
	broadcasts *memberlist.TransmitLimitedQueue
}

type update struct {
	// add, del
	Action string
	Data   map[string][]byte
}

// Invalidates -
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
	return false
}

// Message -
func (b *broadcast) Message() []byte {
	return b.msg
}

// Finished -
func (b *broadcast) Finished() {
	if b.notify != nil {
		close(b.notify)
	}
}

// NodeMeta - functin
func (g *Gossiper) NodeMeta(limit int) []byte {
	return []byte{}
}

// NotifyMsg - function
func (g *Gossiper) NotifyMsg(b []byte) {
	if len(b) == 0 {
		return
	}

	switch b[0] {
	// data
	case 'b':
		var updates []*update
		if err := json.Unmarshal(b[1:], &updates); err != nil {
			return
		}
		g.mtx.Lock()
		for _, u := range updates {
			for k, v := range u.Data {
				switch u.Action {
				case "add":
					g.Items[k] = v
				case "del":
					delete(g.Items, k)
				}
			}
		}
		g.mtx.Unlock()
		events.Publish(events.Event{Sender: "Gossiper", Flag: string(b[0]), Context: nil})
	}
}

// GetBroadcasts - function
func (g *Gossiper) GetBroadcasts(overhead, limit int) [][]byte {
	return g.broadcasts.GetBroadcasts(overhead, limit)
}

// LocalState -function
func (g *Gossiper) LocalState(join bool) []byte {
	g.mtx.RLock()
	m := g.Items
	g.mtx.RUnlock()
	b, _ := json.Marshal(m)
	return b
}

// MergeRemoteState - function
func (g *Gossiper) MergeRemoteState(buf []byte, join bool) {
	if len(buf) == 0 {
		return
	}
	if !join {
		return
	}
	var m map[string][]byte
	if err := json.Unmarshal(buf, &m); err != nil {
		return
	}
	g.mtx.Lock()
	for k, v := range m {
		g.Items[k] = v
	}
	g.mtx.Unlock()
	events.Publish(events.Event{Sender: "Gossiper", Flag: string(buf[0]), Context: nil})
}

// Add - 添加
func (g *Gossiper) Add(key string, val []byte, dataType string) error {
	g.mtx.Lock()
	g.Items[key] = val
	g.mtx.Unlock()

	b, err := json.Marshal([]*update{
		&update{
			Action: "add",
			Data: map[string][]byte{
				key: val,
			},
		},
	})

	if err != nil {
		return err
	}
	//广播数据
	g.broadcasts.QueueBroadcast(&broadcast{
		msg:    append([]byte(dataType), b...),
		notify: nil,
	})

	events.Publish(events.Event{Sender: "Gossiper", Flag: dataType, Context: nil})
	return nil
}

// Del - 删除
func (g *Gossiper) Del(key string, dataType string) error {
	g.mtx.Lock()
	delete(g.Items, key)
	g.mtx.Unlock()

	b, err := json.Marshal([]*update{
		&update{
			Action: "del",
			Data: map[string][]byte{
				key: []byte{},
			},
		},
	})

	if err != nil {
		return err
	}

	g.broadcasts.QueueBroadcast(&broadcast{
		msg:    append([]byte(dataType), b...),
		notify: nil,
	})

	return nil
}

// Get - 获取
func (g *Gossiper) Get(key string) []byte {
	g.mtx.RLock()
	val := g.Items[key]
	g.mtx.RUnlock()
	return []byte(val)
}

func (g *Gossiper) start(members string, port int) error {
	hostname, _ := os.Hostname()
	c := memberlist.DefaultLocalConfig()
	c.Delegate = g
	c.BindPort = port
	c.Name = hostname + "-" + uuid.NewUUID().String()

	// 创建gossip网络
	m, err := memberlist.Create(c)
	if err != nil {
		return err
	}

	//第一个节点没有member，但从第二个开始就有member了
	if len(members) > 0 {
		parts := strings.Split(members, ",")
		_, err := m.Join(parts)
		if err != nil {
			log.Println(err)
			return err
		}
	}
	g.broadcasts = &memberlist.TransmitLimitedQueue{
		NumNodes: func() int {
			return m.NumMembers()
		},
		RetransmitMult: 3,
	}
	node := m.LocalNode()
	log.Printf("本地成员%s:%d\n", node.Addr, node.Port)
	return nil
}

// Run - 启动gossip服务
func (g *Gossiper) Run(members string, port int, ch chan int) {
	g.Items = make(map[string][]byte)
	if err := g.start(members, port); err != nil {
		log.Println(err)
	}
	if ch != nil {
		ch <- 1
	}
	select {}

	// http.HandleFunc("/add", addHandler)
	// http.HandleFunc("/del", delHandler)
	// http.HandleFunc("/get", getHandler)
	// log.Printf("Listening on :%d\n", *port)
	// if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
	// 	log.Println(err)
	// }
}
